home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- __revision__ = '$Id: client.py 678 2008-08-08 11:22:14Z jajcus $'
- __docformat__ = 'restructuredtext en'
- import threading
- import logging
- from pyxmpp.clientstream import ClientStream
- from pyxmpp.iq import Iq
- from pyxmpp.presence import Presence
- from pyxmpp.roster import Roster
- from pyxmpp.exceptions import ClientError, FatalClientError
- from pyxmpp.interfaces import IPresenceHandlersProvider, IMessageHandlersProvider
- from pyxmpp.interfaces import IIqHandlersProvider, IStanzaHandlersProvider
-
- class Client:
-
- def __init__(self, jid = None, password = None, server = None, port = 5222, auth_methods = ('sasl:DIGEST-MD5',), tls_settings = None, keepalive = 0):
- self.jid = jid
- self.password = password
- self.server = server
- self.port = port
- self.auth_methods = list(auth_methods)
- self.tls_settings = tls_settings
- self.keepalive = keepalive
- self.stream = None
- self.lock = threading.RLock()
- self.state_changed = threading.Condition(self.lock)
- self.session_established = False
- self.roster = None
- self.stream_class = ClientStream
- if not hasattr(self, 'interface_providers'):
- self.interface_providers = [
- self]
-
- self._Client__logger = logging.getLogger('pyxmpp.Client')
-
-
- def connect(self, register = False):
- if not self.jid:
- raise ClientError, 'Cannot connect: no or bad JID given'
-
- self.lock.acquire()
-
- try:
- stream = self.stream
- self.stream = None
- if stream:
- import common
- common.netcall(stream.close)
-
- self._Client__logger.debug('Creating client stream: %r, auth_methods=%r' % (self.stream_class, self.auth_methods))
- stream = self.stream_class(jid = self.jid, password = self.password, server = self.server, port = self.port, auth_methods = self.auth_methods, tls_settings = self.tls_settings, keepalive = self.keepalive, owner = self)
- stream.process_stream_error = self.stream_error
- self.stream_created(stream)
- stream.state_change = self._Client__stream_state_change
- stream.connect()
- self.stream = stream
- self.state_changed.notify()
- self.state_changed.release()
- except:
- self.stream = None
- self.state_changed.release()
- raise
-
-
-
- def get_stream(self):
- self.lock.acquire()
- stream = self.stream
- self.lock.release()
- return stream
-
-
- def disconnect(self):
- stream = self.get_stream()
- if stream:
- stream.disconnect()
-
-
-
- def request_session(self):
- stream = self.get_stream()
- if not stream.version:
- need_session = False
- elif not stream.features:
- need_session = False
- else:
- ctxt = stream.doc_in.xpathNewContext()
- ctxt.setContextNode(stream.features)
- ctxt.xpathRegisterNs('sess', 'urn:ietf:params:xml:ns:xmpp-session')
- ctxt.xpathRegisterNs('jsess', 'http://jabberd.jabberstudio.org/ns/session/1.0')
- sess_n = None
-
- try:
- sess_n = ctxt.xpathEval('sess:session or jsess:session')
- finally:
- ctxt.xpathFreeContext()
-
- if sess_n:
- need_session = True
- else:
- need_session = False
- if not need_session:
- self.state_changed.acquire()
- self.session_established = 1
- self.state_changed.notify()
- self.state_changed.release()
- self._session_started()
- else:
- iq = Iq(stanza_type = 'set')
- iq.new_query('urn:ietf:params:xml:ns:xmpp-session', 'session')
- stream.set_response_handlers(iq, self._Client__session_result, self._Client__session_error, self._Client__session_timeout)
- stream.send(iq)
-
-
- def request_roster(self):
- stream = self.get_stream()
- iq = Iq(stanza_type = 'get')
- iq.new_query('jabber:iq:roster')
- stream.set_response_handlers(iq, self._Client__roster_result, self._Client__roster_error, self._Client__roster_timeout)
- stream.set_iq_set_handler('query', 'jabber:iq:roster', self._Client__roster_push)
- stream.send(iq)
-
-
- def get_socket(self):
- return self.stream.socket
-
-
- def loop(self, timeout = 1):
- while None:
- stream = self.get_stream()
- if not stream:
- break
-
- act = stream.loop_iter(timeout)
- if not act:
- self.idle()
- continue
- continue
- return None
-
-
- def __session_timeout(self):
- raise FatalClientError('Timeout while tryin to establish a session')
-
-
- def __session_error(self, iq):
- err = iq.get_error()
- msg = err.get_message()
- raise FatalClientError('Failed to establish a session: ' + msg)
-
-
- def __session_result(self, _unused):
- self.state_changed.acquire()
- self.session_established = True
- self.state_changed.notify()
- self.state_changed.release()
- self._session_started()
-
-
- def _session_started(self):
- for ob in self.interface_providers:
- if IPresenceHandlersProvider.providedBy(ob):
- for handler_data in ob.get_presence_handlers():
- self.stream.set_presence_handler(*handler_data)
-
-
- if IMessageHandlersProvider.providedBy(ob):
- for handler_data in ob.get_message_handlers():
- self.stream.set_message_handler(*handler_data)
-
-
- if IIqHandlersProvider.providedBy(ob):
- for handler_data in ob.get_iq_get_handlers():
- self.stream.set_iq_get_handler(*handler_data)
-
- for handler_data in ob.get_iq_set_handlers():
- self.stream.set_iq_set_handler(*handler_data)
-
-
- self.session_started()
-
-
- def __roster_timeout(self):
- raise ClientError('Timeout while tryin to retrieve roster')
-
-
- def __roster_error(self, iq):
- err = iq.get_error()
- msg = err.get_message()
- raise ClientError('Roster retrieval failed: ' + msg)
-
-
- def __roster_result(self, iq):
- q = iq.get_query()
- if q:
- self.state_changed.acquire()
- self.roster = Roster(q)
- self.state_changed.notify()
- self.state_changed.release()
- self.roster_updated()
- else:
- raise ClientError('Roster retrieval failed')
-
-
- def __roster_push(self, iq):
- fr = iq.get_from()
- if fr and fr.bare() != self.jid.bare():
- resp = iq.make_error_response('forbidden')
- self.stream.send(resp)
- raise ClientError('Got roster update from wrong source')
-
- if not self.roster:
- raise ClientError('Roster update, but no roster')
-
- q = iq.get_query()
- items = self.roster.update(q)
- for item in items:
- self.roster_updated(item)
-
- resp = iq.make_result_response()
- self.stream.send(resp)
-
-
- def __stream_state_change(self, state, arg):
- self.stream_state_changed(state, arg)
- if state == 'fully connected':
- self.connected()
- elif state == 'authorized':
- self.authorized()
- elif state == 'disconnected':
- self.state_changed.acquire()
-
- try:
- if self.stream:
- self.stream.close()
-
- self.stream_closed(self.stream)
- self.stream = None
- self.state_changed.notify()
- finally:
- self.state_changed.release()
-
- self.disconnected()
-
-
-
- def idle(self):
- stream = self.get_stream()
- if stream:
- stream.idle()
-
-
-
- def stream_created(self, stream):
- pass
-
-
- def stream_closed(self, stream):
- pass
-
-
- def session_started(self):
- self.request_roster()
- p = Presence()
- self.stream.send(p)
-
-
- def stream_error(self, err):
- self._Client__logger.error('Stream error: condition: %s %r' % (err.get_condition().name, err.serialize()))
-
-
- def roster_updated(self, item = None):
- pass
-
-
- def stream_state_changed(self, state, arg):
- pass
-
-
- def connected(self):
- pass
-
-
- def authenticated(self):
- pass
-
-
- def authorized(self):
- self.request_session()
-
-
- def disconnected(self):
- pass
-
-
-